package com.xinyue.mqsystem.mq;

import java.util.Collection;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import com.xinyue.mqsystem.exception.GameMQSendFailedException;

public class GameMQTemplate {

    private RocketMQTemplate mqTemplate;
    private static Logger logger = LoggerFactory.getLogger(GameMQTemplate.class);

    public GameMQTemplate(RocketMQTemplate rocketMQTemplate) {
        this.mqTemplate = rocketMQTemplate;
    }

    public void syncSendOrderly(String topic, byte[] data, Object hashKey) throws GameMQSendFailedException {
        this.syncSendOrderly(topic, data,hashKey, null);
    }

    public void syncSendOrderly(String topic, byte[] data, Object hashKey,Collection<String> tags) throws GameMQSendFailedException {
        logger.trace("rocketmq send message size : {}", data.length);
        Message<byte[]> message = MessageBuilder.withPayload(data).build();
        SendResult sendResult = mqTemplate.syncSendOrderly(topic, message, hashKey.toString());
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            logger.error("RocketMQ发送消息失败，{}", sendResult);
            throw new GameMQSendFailedException(sendResult.toString());
        }
    }

    private String addTags(String topic, Collection<String> tags) {
        
        if (tags != null && tags.size() > 0) {
            StringBuilder str = new StringBuilder();
            str.append(topic).append(":").append(convertTags(tags));
            
            return str.toString();
        } else {
            return topic;
        }
    }
    
    public static String convertTags(Collection<String> tags) {
        StringBuilder str = new StringBuilder();
        for(String tag : tags) {
            str.append(tag).append(" ||");
        }
        str.delete(str.length() - 3, str.length());
        return str.toString();
    }

    public void asyncSendOrderly(String topic, byte[] data, Collection<String> tags) {

        Message<byte[]> message = MessageBuilder.withPayload(data).build();
        final String des = this.addTags(topic, tags);
        mqTemplate.asyncSend(des, message, new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                logger.trace("异步发送消息到Topic:{}成功", des);
            }

            @Override
            public void onException(Throwable e) {
                logger.error("异步发送消息到Topic:{}失败", des, e);
            }
        });
    }


}
